Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13801][SQL] DataFrame.col should return unresolved attribute #11632

Closed
wants to merge 16 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

Let's start with an example:

val df = ...
val df2 = df.filter(...)
df.join(df2, (df("key") + 1) === df2("key"))

This query won't work and returns wrong result as df("key") and df2("key") reference to a same column.

I think the biggest problem is, we give users the resolved attribute. However, resolved attribute is not real column, as logical plan's output may change. For example, we will generate new output for the right child in self-join.

We should make Dataset.col return unresolved attribute, and still do the resolution to make sure the given column name is resolvable, but don't return the resolved one, just get the name out and wrap it with UnresolvedAttribute.

We should also alias every single Dataset ever generated, with a globally unique name, so that Dataset.col will return an unresolved attribute with a unique qualifier, and our framework can reference the corrected column with it.

When we alias a Dataset, we should clean out previously generated aliases, to avoid making the plan tree too big.

Now if users run the query again, it works.

There are 3 exceptional cases that we should not alias:

  1. Dataset.as: this is obvious, we can't override user-provided alias.
  2. Dataset.toDF(): this just turns a strongly typed Dataset to generic DataFrame, not creating a new one, no need to alias.
  3. join: join is very special, it may have same-name outputs with different qualifiers, so we can't alias it as we will replace the original qualifiers and make the same-name outputs un-distinguishable. We can't clean out previously generated aliases either, as it will erase the original qualifiers.

However, if users manually introduce ambiguity, this PR will break existing code, for example:

val df1 = ....  // schema: [id, name]
val df2 = ....  // schema: [id, age]
val left = df1.as("same")
val right = df2.as("same")
left.join(right, left("id") === right("id"))

It will fail analysis after this PR, because the 2 id columns have same qualifier.

This PR also breaks some corner cases that works before, but I think it's good to break it:

val df = ...
val df1 = df.filter(...)
df1.select(df("key"))

It will throw an exception now, because we can't resolve df("key") based on df1. I think this is reasonable, df and df1 are different Datasets, it's weird to support select a column from different Dataset. Another case:

val df = ...
df.join(df, df("key") === df("key"))

It doesn't make sense to support self-join in this way, users should use df.join(df, "key"). And this trick is very fragile, it stops working if we do some operation on the column, e.g. df.join(df, (df("key") + 1) === df("key"))

How was this patch tested?

existing tests.

@@ -115,7 +115,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
}

test("[SPARK-6231] join - self join auto resolve ambiguity") {
ignore("[SPARK-6231] join - self join auto resolve ambiguity") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, I'm not sure this "auto resolve ambiguity" feature is useful anymore.

@cloud-fan
Copy link
Contributor Author

cc @rxin @marmbrus @yhuai @liancheng

@SparkQA
Copy link

SparkQA commented Mar 10, 2016

Test build #52827 has finished for PR 11632 at commit 9642324.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor

The major problem I see is that now any two DataFrames must be aliased first before being joined. This breaks existing user code. However, this PR does solve a more common and subtle problem that is hard to debug, especially when the join condition is not as trivial as df("i") === df2("i"). Our existing strategy only issues intuitive exception message against trivial comparisons.

@gatorsmile
Copy link
Member

I like the idea. If we do not change the current behavior, issuing an error with name ambiguity is just confusing. Maybe we just document it. It might be enough. However, more importantly, it could generate a wrong result. See the example

val df1 = Seq((1, 3), (2, 1)).toDF("keyCol1", "keyCol2")
val df2 = Seq((1, 4), (2, 1)).toDF("keyCol1", "keyCol3")
val df3 = df1.join(df2, df1("keyCol1") === df2("keyCol1")).select(df1("keyCol1"), $"keyCol3")
df3.join(df1, df3("keyCol3") === df1("keyCol1")).show()

The above query returns an empty result set.
However, the correct result should be (2,1,1,3).

@cloud-fan cloud-fan force-pushed the df-self-join branch 2 times, most recently from 5eb1b5d to 2a0122e Compare March 12, 2016 15:40
@SparkQA
Copy link

SparkQA commented Mar 12, 2016

Test build #53006 has finished for PR 11632 at commit 2a0122e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 13, 2016

Test build #53032 has finished for PR 11632 at commit d892584.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 14, 2016

Test build #53056 has finished for PR 11632 at commit d258b47.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 14, 2016

Test build #53069 has finished for PR 11632 at commit 3c77c5a.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 14, 2016

Test build #53073 has finished for PR 11632 at commit f3df4f6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 15, 2016

Test build #53196 has finished for PR 11632 at commit 5557806.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53261 has finished for PR 11632 at commit 6faf40e.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53288 has finished for PR 11632 at commit f0b653e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2016

Test build #53671 has finished for PR 11632 at commit 387628f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2016

Test build #53672 has finished for PR 11632 at commit 9a5ce19.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

cc @rxin @marmbrus , it's ready for review :)

@SparkQA
Copy link

SparkQA commented Mar 23, 2016

Test build #53891 has finished for PR 11632 at commit fcebfb6.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 23, 2016

Test build #53894 has finished for PR 11632 at commit c1a26f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 23, 2016

Test build #53899 has finished for PR 11632 at commit 7963244.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 23, 2016

Test build #53914 has finished for PR 11632 at commit 94d2641.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Mar 23, 2016

Great work :) It seems that user intentions in the example query is obvious to some extent, so I wonder if we can automatically fix this kind of name ambiguity in an Analyzer phase?
Throwing an exception to make users eliminate ambiguity is one of solutions though,
we slightly loss the simplicity of user codes.

@aray
Copy link
Contributor

aray commented Mar 23, 2016

While this may help with join ambiguity. I think the more fundamental problem is that a transformed DataFrame should not be giving the same column references as the original. For an example where this is still a problem even with this patch see @wesm recent blog post http://wesmckinney.com/blog/compiling-dataframe-code/

@cloud-fan
Copy link
Contributor Author

cc @maropu @aray , yea, we can do better for this problem, this PR is just step 1. We can alias every DataFrame so that transformed DataFrame won't give the same column references as the original. But it requires more changes and I'd like to do it in follow-ups

@SparkQA
Copy link

SparkQA commented Mar 25, 2016

Test build #54134 has finished for PR 11632 at commit 249526b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class JavaChiSqSelectorExample
    • public class JavaCorrelationsExample
    • public class JavaElementwiseProductExample
    • public class JavaHypothesisTestingExample
    • public class JavaHypothesisTestingKolmogorovSmirnovTestExample
    • public class JavaKernelDensityEstimationExample
    • public class JavaStratifiedSamplingExample
    • public class JavaSummaryStatisticsExample
    • class MultilayerPerceptronClassificationModelWriter(
    • class TypeConverters(object):
    • probabilityCol = Param(Params._dummy(), \"probabilityCol\", \"Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities.\", typeConverter=TypeConverters.toString)
    • thresholds = Param(Params._dummy(), \"thresholds\", \"Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold.\", typeConverter=TypeConverters.toListFloat)
    • public final class XXH64
    • abstract class HashExpression[E] extends Expression
    • abstract class InterpretedHashFunction
    • case class Murmur3Hash(children: Seq[Expression], seed: Int) extends HashExpression[Int]
    • case class XxHash64(children: Seq[Expression], seed: Long) extends HashExpression[Long]
    • final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator
    • class FileStreamSink(
    • class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging
    • class HDFSBackedStateStore(val version: Long, mapToUpdate: MapType)
    • case class StateStoreId(checkpointLocation: String, operatorId: Long, partitionId: Int)
    • trait StateStore
    • trait StateStoreProvider
    • case class ValueAdded(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
    • case class ValueUpdated(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
    • case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
    • class StateStoreRDD[T: ClassTag, U: ClassTag](
    • implicit class StateStoreOps[T: ClassTag](dataRDD: RDD[T])

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54313 has finished for PR 11632 at commit 97bfd34.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54315 has finished for PR 11632 at commit 97bfd34.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54317 has finished for PR 11632 at commit 619d34c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54316 has finished for PR 11632 at commit 619d34c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54321 has finished for PR 11632 at commit 86a8877.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 29, 2016

Test build #54403 has finished for PR 11632 at commit a1822da.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor

Shall we close this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants